﻿#pragma once

#include "../redis/redis.hh"
#include "../util/box_std_allocator.hh"
#include "../util/object_pool.hh"
#include "../util/spsc_queue_pair.hpp"
#include <atomic>
#include <thread>
#include <unordered_map>
#include <vector>

struct redisContext;
struct redisReply;

namespace kratos {
namespace service {
class ServiceBox;
}
} // namespace kratos

namespace kratos {

namespace redis {

class ResultImpl;

/**
 * Redis工作线程, 每个主机一个工作线程
 */
class RedisWorker {
  std::thread worker_; ///< 线程
  corelib::SPSCQueue<ResultImpl *>
      main_spsc_queue_; ///< 写：主线程， 读：工作线程
  corelib::SPSCQueue<ResultImpl *>
      worker_spsc_queue_; ///< 写：工作线程，读：主线程
  /**
   * 双队列读写包装对象.
   */
  corelib::SPSCQueuePair<ResultImpl *> main_queue_{&worker_spsc_queue_,
                                                   &main_spsc_queue_};
  /**
   * 双队列读写包装对象.
   */
  corelib::SPSCQueuePair<ResultImpl *> worker_queue_{&main_spsc_queue_,
                                                     &worker_spsc_queue_};

  std::atomic_bool running_{false};   ///< 运行标志
  redisContext *ctx_{nullptr};        ///< redisContext
  std::string host_;                  ///< 主机地址
  int port_{6379};                    ///< 主机监听端口
  std::string user_;                  ///< 用户名
  std::string passwd_;                ///< 密码
  service::ServiceBox *box_{nullptr}; ///< 服务容器

public:
  /**
   * 构造.
   *
   * \param box 服务容器
   */
  RedisWorker(service::ServiceBox *box);
  /**
   * 析构.
   *
   */
  ~RedisWorker();
  /**
   * 启动一个Redis工作线程.
   *
   * \param host 地址
   * \param port 端口
   * \param user 用户名
   * \param passwd 密码
   * \return true或false
   */
  auto start(const std::string &host, int port, const std::string &user,
             const std::string &passwd) -> bool;
  /**
   * 关闭工作线程.
   *
   * \return true或false
   */
  auto stop() -> bool;
  /**
   * 异步执行一个Redis命令.
   *
   * \param command 命令
   * \param timeout 超时，毫秒
   * \param handler 命令处理器
   * \param user_data 用户数据
   * \return true或false
   */
  auto do_command(const std::string &command, std::time_t timeout,
                  RedisHandler handler, std::uint64_t user_data) -> bool;
  /**
   * 协程执行一个Redis命令.
   *
   * \param command 命令
   * \param timeout 超时，毫秒
   * \return true或false
   */
  auto do_command_co(const std::string &command, std::time_t timeout)
      -> std::unique_ptr<Result>;
  /**
   * 异步执行一个Redis命令.
   *
   * \param cmd_vec 命令数组
   * \param timeout 超时，毫秒
   * \param handler 命令处理器
   * \param user_data 用户数据
   * \return true或false
   */
  auto do_command(const CommandVector &cmd_vec, std::time_t timeout,
                  RedisHandler handler, std::uint64_t user_data) -> bool;
  /**
   * 协程执行一个Redis命令.
   *
   * \param cmd_vec 命令数组
   * \param timeout 超时，毫秒
   * \return true或false
   */
  auto do_command_co(const CommandVector &cmd_vec, std::time_t timeout)
      -> std::unique_ptr<Result>;
  /**
   * 主循环，调用命令处理器，检查超时
   *
   * \param ms 当前时间戳
   * \return 执行的命令处理器数量
   */
  auto update(std::time_t ms) -> std::size_t;

private:
  /**
   * 连接Redis主机.
   *
   * \param host 地址
   * \param port 端口
   * \param user 用户名
   * \param passwd 密码
   * \return RedisContext
   */
  auto connect_redis(const std::string &host, int port, const std::string &user,
                     const std::string &passwd) -> redisContext *;
  /**
   * 连接Redis主机.
   *
   * \param host 地址
   * \param port 端口
   * \param user 用户名
   * \param passwd 密码
   * \return RedisContext
   */
  auto connect_redis_internal(const std::string &host, int port,
                              const std::string &user,
                              const std::string &passwd) -> redisContext *;
  /**
   * 执行Redis命令.
   *
   * \param command 命令
   * \param result 结果
   * \param reconnect 是否自动重连
   * \return redisReply
   */
  auto do_redis_command(const std::string &command, ResultImpl *result,
                        bool reconnect) -> redisReply *;

  /**
   * 执行Redis命令, pipeline模式
   *
   * \param cmd_vec 命令数组
   * \param result 结果
   * \param reconnect 是否自动重连
   * \return true成功, false失败
   */
  auto do_redis_command_batch(const CommandVector& cmd_vec, ResultImpl *result,
                        bool reconnect) -> bool;
};

/**
 * Redis实现
 */
class RedisImpl : public Redis {
  using WorkerVector = std::vector<kratos::unique_pool_ptr<RedisWorker>>;
  using WorkerMap = std::unordered_map<
      std::string, WorkerVector, std::hash<std::string>,
      std::equal_to<std::string>,
      kratos::service::Allocator<std::pair<const std::string, WorkerVector>>>;
  service::ServiceBox *box_{nullptr}; ///< 服务容器
  WorkerMap worker_map_;              ///< {连接名,工作线程}

public:
  RedisImpl(service::ServiceBox *box);
  virtual ~RedisImpl();
  virtual auto start() -> bool override;
  virtual auto stop() -> bool override;
  virtual auto update(std::time_t ms) -> std::size_t override;
  virtual auto add_host(const std::string &name, const std::string &host,
                        int port, const std::string &user,
                        const std::string &passwd) -> bool override;
  virtual auto do_command(const std::string &name, const std::string &command,
                          std::time_t timeout, RedisHandler handler,
                          std::uint64_t user_data) -> bool override;
  virtual auto do_command_co(const std::string &name,
                             const std::string &command, std::time_t timeout)
      -> std::unique_ptr<Result> override;
  virtual auto do_command(const std::string &name, const CommandVector &cmd_vec,
                          std::time_t timeout, RedisHandler handler,
                          std::uint64_t user_data) -> bool override;
  virtual auto do_command_co(const std::string &name,
                             const CommandVector &cmd_vec, std::time_t timeout)
      -> std::unique_ptr<Result> override;
};

/**
 * Redis执行结果
 */
class ResultImpl : public Result {
  using ReplyVector = kratos::service::PoolVector<void*>;

  CommandVector cmd_vec_;                      ///< 命令数组
  std::string error_;                          ///< 错误描述
  RedisError error_code_{RedisError::SUCCESS}; ///< 状态
  ReplyVector reply_vec_;                      ///< hiredis redisReply*
  std::uint64_t coid_{0};                      ///< 协程ID
  RedisHandler handler_;                       ///< 处理器
  std::time_t timeout_{1000};                  ///< 执行超时
  std::uint64_t user_data_{0};                 ///< 用户数据
  std::size_t cur_index_{0};                   ///< 当前索引

public:
  ResultImpl();
  virtual ~ResultImpl();
  virtual auto get_command() const -> const std::string & override;
  virtual auto get_reply() const -> void * override;
  virtual auto get_command_count() const -> std::size_t override;
  virtual auto get_command_array() const -> const CommandVector & override;
  virtual auto get_command(std::size_t index) const
      -> const std::string & override;
  virtual auto get_reply(std::size_t index) const -> void * override;
  virtual auto next_reply() const -> bool override;
  virtual auto is_success() const -> bool override;
  virtual auto get_error() const -> const std::string & override;
  virtual auto get_error_code() const -> RedisError override;
  virtual auto get_return(std::string &s) const -> bool override;
  virtual auto get_return(std::vector<std::string> &v) const -> bool override;
  virtual auto get_return(std::list<std::string> &v) const -> bool override;
  virtual auto get_return(std::unordered_map<std::string, std::string> &m) const
      -> bool override;
  virtual auto get_return(std::unordered_set<std::string> &s) const
      -> bool override;
  virtual auto get_return(std::size_t &size) const -> bool override;
  virtual auto get_return(bool &b) const -> bool override;

public:
  /**
   * 设置错误描述.
   *
   * \param string 错误描述
   * \return
   */
  auto set_error(const char *string) -> void;
  /**
   * 设置Redis返回.
   *
   * \param reply Redis返回
   * \return
   */
  auto set_reply(void *reply) -> void;
  /**
   * 获取超时，毫秒
   *
   * \return 超时，毫秒
   */
  auto get_timeout() -> std::time_t;
  /**
   * 设置超时，毫秒.
   *
   * \param ms 超时，毫秒
   * \return
   */
  auto set_timeout(std::time_t ms) -> void;
  /**
   * 设置异步处理器.
   *
   * \param handler 异步处理器
   * \return
   */
  auto set_handler(RedisHandler handler) -> void;
  /**
   * 获取异步处理器.
   *
   * \return 异步处理器
   */
  auto get_handler() -> RedisHandler;
  /**
   * 设置Redis命令.
   *
   * \param command Redis命令
   * \return
   */
  auto set_command(const std::string &command) -> void;
  /**
   * 获取协程ID.
   *
   * \return 协程ID
   */
  auto get_coid() -> std::uint64_t;
  /**
   * 设置协程ID.
   *
   * \param coid 协程ID
   * \return
   */
  auto set_coid(std::uint64_t coid) -> void;
  /**
   * 设置错误码.
   *
   * \param code 错误码
   * \return
   */
  auto set_error_code(RedisError code) -> void;
  /**
   * @brief 设置用户数据
   * @param user_data 用户数据
   * @return
   */
  auto set_user_data(std::uint64_t user_data) -> void;
  /**
   * @brief 获取用户数据
   * @return 用户数据
   */
  auto get_user_data() -> std::uint64_t;
};

} // namespace redis
} // namespace kratos
